Conversation
98127fc to
e551e65
Compare
Introduces a scheduler with four layered admission constraints: - Total concurrency: hard global cap on running jobs - Priority tiers: weighted share of total concurrency per tier (Priority struct with Level + Weight) - Per-type limits: fraction of tier slots per job type - Conflict groups: mutual exclusion by group + job ID Within a tier, jobs are ordered by accumulated cost per fairness key (lowest first), with cost estimated via EMA of wall time. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
14e8f8a to
ccf88e4
Compare
Add per-job admission cost floor to prevent volume-based DoS by charging a minimum fairness cost regardless of actual job duration. Deduplicate jobs by (type, id): Submit silently drops duplicates, RunSync coalesces callers onto the existing job. Sync and async jobs can be mixed on the same key. When all RunSync waiters cancel and no Submit owns the job, the job's context is cancelled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
ccf88e4 to
235e5b2
Compare
| // Scheduler implements weighted fair queuing with conflict exclusion. | ||
| type Scheduler struct { | ||
| mu sync.Mutex | ||
| priorities map[Priority]bool |
There was a problem hiding this comment.
if priority includes both Level and Weight, does this break the invariant that same-level means same-tier? because you could have multiple entries with the same level (and different weights) at once, as separate keys?
| s.removeWaiterLocked(j, done) | ||
| s.maybeRemoveJobLocked(j) | ||
| s.mu.Unlock() | ||
| return errors.WithStack(ctx.Err()) |
There was a problem hiding this comment.
| return errors.WithStack(ctx.Err()) | |
| return errors.WithStack(ctx.Err()) | |
| case <-s.ctx.Done(): | |
| return errors.WithStack(s.ctx.Err()) |
| s.mu.Unlock() | ||
|
|
||
| for _, j := range toRun { | ||
| go s.executeJob(j) |
There was a problem hiding this comment.
amp review for your consideration:
Close() doesn't wait for executing jobs. s.wg only tracks dispatchLoop and cleanupLoop. Jobs launched via go s.executeJob(j) aren't tracked, so Close() returns while jobs are still running. This can cause data corruption or lost waiter signals during shutdown. Fix: s.wg.Add(1) before go s.executeJob(j), defer s.wg.Done() inside executeJob.
| j.waiters = nil | ||
| s.recordMetricsLocked() | ||
| s.mu.Unlock() | ||
|
|
There was a problem hiding this comment.
amp suggests defer rather than inlining this section
Panic in j.fn() permanently leaks a concurrency slot. If the job function panics, the cleanup code (removing from running, deleting from active, signaling waiters) never runs. The slot is permanently occupied and waiters deadlock. Should use defer for the cleanup path and consider recover().
| jobsTotal metric.Int64Counter | ||
| jobDuration metric.Float64Histogram |
There was a problem hiding this comment.
it doesn't look like these are used anywhere?
Summary
Adds a weighted fair queuing scheduler with conflict exclusion. All work (foreground and background) flows through a single scheduler that controls admission via four layered constraints:
Config.TotalConcurrency)Priority.Weight / sum(weights) * total). Higher-level tiers dispatch first, preventing background from starving foregroundJobTypeConfig.MaxConcurrency, 0-1)Within a tier, jobs are ordered by accumulated cost per fairness key (lowest first, then arrival time). Cost is estimated via an exponential moving average of observed wall time.
Includes design doc (README.md), flow diagram (scheduler.svg), unit tests, and a soak test.